/**
 * FileName: StreamingWindow
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/18 19:59
 * Description: 时间窗口，聚合
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;
import static org.apache.spark.sql.functions.window;

public class StreamingWindow {
    public static void main(String[] args) {
        /**
         * SparkSession 是 Spark SQL 的入口。
         * 使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候，第一个要创建的对象就是 SparkSession。
         */
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("FilterMapApp")
                .getOrCreate();
        /**
         * 从kafka读取数据，为流式查询创建Kafka源
         */
        Dataset<Row> kafkaDataset = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                .option("startingOffsets", ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
                .load();


        /**
         * 将value分割为多列并且包含timestamp列
         */
        Dataset<Row> columnDataset = kafkaDataset
                .select(col("value").cast("string"), col("timestamp"))
                .withColumn("tmp", split(col("value"), ColumnsUtil.getRegex()))
                .select(ColumnsUtil.getInstance().combineColumns(ColumnsUtil.getInstance().getColumns("tmp"),
                        col("timestamp")))
                .drop(col("tmp"));

        /**
         * 依据 timestamp,phone 进行分组,统计
         * 计算30s内手机号出现的次数
         */
        Dataset<Row> result = columnDataset
                .groupBy(window(col("timestamp"), "30 seconds"), col("phone"))
                .count();

        /**
         * 打印结果输出到控制台
         */
        StreamingQuery query = result
                .writeStream()
                .format("console")
                .outputMode(OutputMode.Complete())
                .option("truncate", "false")
                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }


    }
}
